<?php
/**
 * Created by PhpStorm.
 * User: zxqc2018
 * Date: 2018/12/24
 * Time: 15:52
 */

namespace App\Service\Amqp;


use App\Service\ErrorCode;
use App\Service\ResultData;
use App\Model\Queue\MqProcessErrorLog;
use App\Model\Queue\MqProcessLog;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\HttpKernel\Exception\HttpException;
use Throwable;

/**
 * rabbitMq 抽象父类
 * Class AbstractRabbitMq
 * @package App\Service\Amqp
 * @author zxqc2018
 */
abstract class AbstractRabbitMq implements AmqpInterface
{
    //消费者同时处理消息的最大个数
    const PREFETCH_COUNT = 3;
    //数据库中关键词的key   sendMsg data 中的 key
    const MSG_FIND_KEYWORD_STR = 'msg_fk_str';
    //队列优先级配置 fast key
    const QUEUE_PRIORITY_FAST_KEY = 'fast';
    //队列优先级配置 middle key
    const QUEUE_PRIORITY_MIDDLE_KEY = 'middle';
    //队列优先级配置 slow key
    const QUEUE_PRIORITY_SLOW_KEY = 'slow';
    //消息内容 消息ID key
    const MSG_ID_KEY = 'mq_msg_id';
    //消息重发上限
    const RESEND_TIME_LIMIT = 3;

    //测试 fast routeKey
    const DEMO_FAST_TEST = 'demo.fast.test';
    //测试 slow routeKey
    const DEMO_SLOW_TEST = 'demo.slow.test';

    //mq配置名字  config/amqp.php中
    protected $configName;
    //默认交换机名
    protected $defaultExchangeName;
    //快中慢-队列优先级配置
    protected $queuePriorityConfig = [];
    //routeKey对应的处理方法数组   key 为routeKey  value [对象, 对象方法]|匿名方法
    protected $routeKeyProcessFunc = [];
    /**
     * @var AMQPStreamConnection
     */
    protected $conn;

    /**
     * @var AMQPChannel
     */
    protected $channel;

    /**
     * AbstractRabbitMq constructor.
     */
    public function __construct()
    {
        $this->connection();
    }

    /**
     * 获取队列优先级配置信息
     * @param null|string $priorityKey 优先级key
     * @param int $type 0 队列名 1 队列的绑定routeKey
     * @return array|mixed|string
     * @author zxqc2018
     */
    protected function getQueuePriorityConfig($priorityKey = null, $type = 0)
    {
        if (is_null($priorityKey)) {
            return $this->queuePriorityConfig;
        }

        $queueConfigInfo = $this->queuePriorityConfig[$priorityKey] ?? [];

        return $queueConfigInfo[$type] ?? '';
    }

    /**
     * 取得消息类型配置
     * @param int $type 类型 0  返回数组的key 是 routeKey 1 反之
     * @return array
     * @author zxqc2018
     */
    protected function getMsgRouteKeyConfig($type = 0)
    {
        $config = [
            self::DEMO_FAST_TEST => 99,
            self::DEMO_SLOW_TEST => 98,
        ];

        if ($type == 1) {
            $config = array_flip($config);
        }
        return $config;
    }

    /**
     * routeKey与msgType相互转化[根据值自动判断]
     * @param string|int $value 路由key|msgType数字
     * @return int|string
     * @author zxqc2018
     */
    public function routeKey2MsgType($value)
    {
        $defaultValue = 0;
        $config = $this->getMsgRouteKeyConfig();
        if (is_numeric($value)) {
            $config = $this->getMsgRouteKeyConfig(1);
            $defaultValue = '';
        }

        return $config[$value] ?? $defaultValue;
    }

    /**
     * 包裹查询 keyword 到 data
     * @param array $data 发送消息数据
     * @param mixed $findKeyword 消息的查询关键词
     * @return array
     * @author zxqc2018
     */
    public static function wrapFindKeyword(array $data, $findKeyword)
    {
        $data[self::MSG_FIND_KEYWORD_STR] = $findKeyword;
        return $data;
    }
    /**
     * 链接mq服务器
     * @author zxqc2018
     */
    protected function connection()
    {
        //获取MQ配置
        $conf = config("amqp.{$this->configName}");

        try {
            $this->conn = new AMQPStreamConnection(
                $conf['host'], $conf['port'], $conf['user'], $conf['pwd'], $conf['vhost']
            );
            $this->channel = $this->conn->channel();
        } catch (Throwable $e) {
            resultData([], ErrorCode::ERROR_RABBIT_MQ, exceptionMessageShow($e, 'MQ连接失败'))->withException();
        }
    }

    /**
     * 队列监听处理方法
     * @param string $queueName 队列名
     * @param array|callable $processFunc 执行方法   匿名函数|[对象,对象方法]
     * @return mixed
     * @author zxqc2018
     * @throws \ErrorException
     */
    function msgListenerProcess($queueName, $processFunc)
    {
        $consumerTag = 'consumer-' . rand(1, 999);
        $this->channel->queue_declare($queueName, false, true, false, false);

        //生成匿名方法封装固定操作
        $callback = function (AMQPMessage $msg) use ($processFunc){
            $tmpDataArr = mixedTwoWayOpt($msg->body, 2);

            if (empty($tmpDataArr) || empty($tmpDataArr[self::MSG_ID_KEY])) {
                return resultData([], ErrorCode::ERROR_RABBIT_MQ, '消息ID不存在');
            }
            $msgId = $tmpDataArr[self::MSG_ID_KEY];
            $msgSqlData = MqProcessLog::getInstance()->findOne('id', $msgId, 'id,msg_str,msg_type,find_keyword,process_num,process_status');
            $updateData = [
                'process_start_time' => time(),
            ];
            try {
                $processResultData = call_user_func_array($processFunc, [$msgSqlData]);
                if ($processResultData instanceof ResultData) {
                    if ($processResultData->getCode() == 0) {
                        $updateData['process_status'] = 1;
                    } else {
                        $updateData['process_status'] = 2;
                    }
                    $updateData['process_msg'] = $processResultData->getMessage();
                } else {

                    //不关心成功与否消息 默认成功
                    $updateData['process_msg'] = 'success';
                    $updateData['process_status'] = 1;

                    //兼容bool返回值统计
                    if (is_bool($processResultData)) {
                        if (!$processResultData) {
                            $updateData['process_msg'] = 'fail';
                            $updateData['process_status'] = 2;
                        }
                    }
                }
            } catch (HttpException $e) {
                $updateData['process_status'] = 2;
                $updateData['process_msg'] = $e->getMessage();
            }catch (Throwable $e) {
                $updateData['process_status'] = 2;
                $updateData['process_msg'] = $e->getMessage() . '|' . $e->getLine() . '|' . $e->getFile();
            }
            $updateData['process_num'] = $msgSqlData['process_num'] + 1;
            $updateData['process_end_time'] = time();

            MqProcessLog::getInstance()->simpleUpdate(['id' => $msgId], $updateData);

            if ($updateData['process_status'] == 2) {
                MqProcessErrorLog::getInstance()->simpleCreate([
                    'msg_id' => $msgId,
                    'process_msg' => $updateData['process_msg'],
                    'create_time' => time(),
                ]);
            }

            //默认握手成功  成功失败情况数据库查看情况
            $msg->delivery_info['channel']->basic_ack(
                $msg->delivery_info['delivery_tag']);
        };

        //设置消费者同时处理消息的最大个数
        //由于这里我们通过数据库来记录执行成功与否,每次都握手情况下,这个参数其实没效果
        $this->channel->basic_qos(null, self::PREFETCH_COUNT, null);
        $this->channel->basic_consume($queueName, $consumerTag, false, false, false, false, $callback);


        while(count($this->channel->callbacks)) {
            $this->channel->wait();
        }

        $this->channel->close();
        $this->conn->close();
    }

    /**
     * 根据优先级来处理默认的消息回调支持3种类型
     * @param string $priorityStr 优先级字符串  fast|middle|slow
     * @author zxqc2018
     * @throws \ErrorException
     */
    public function listenerProcess($priorityStr)
    {
        $queueName = $this->getQueuePriorityConfig($priorityStr);

        if (empty($queueName)) {
            resultData([], ErrorCode::ERROR_RABBIT_MQ, '此优先级不存在')->withException();
        }

        //监听消息请求
        $this->msgListenerProcess($queueName, [$this, 'msgProcess']);
    }
    /**
     * 发送消息
     * @param array $data 消息内容
     * @param array|string $exchangeParam 交换机配置 [名字, 交换机类型 direct|fanout|topic]
     * @param array|string $queueParam 队列配置 |[名字, 队列绑定的路由键]
     * @param string $routeKey 消息的路由键
     * @return ResultData
     * @author zxqc2018
     */
    public function sendMsg($data, $exchangeParam, $queueParam, $routeKey)
    {
        $res = resultData();
        $checkData[] = $exchangeName = $exchangeParam[0] ?? '';
        $checkData[] = $exchangeType = $exchangeParam[1] ?? '';
        $checkData[] = $queueName = $queueParam[0] ?? '';
        $checkData[] = $queueRouteKey = $queueParam[1] ?? '';

        if (count(array_filter($checkData)) < 4) {
            resultData([], ErrorCode::ERROR_RABBIT_MQ, '消息创建失败')->withException();
        }

        //是否是重发 根据 data是否包含msg_id_key 来判断
        if (isset($data[self::MSG_ID_KEY])) {
            $mqProcessInfo = MqProcessLog::getInstance()->findOne(['id' => $data[self::MSG_ID_KEY], 'process_status' => 2]);
            if (empty($mqProcessInfo)) {
                resultData([], ErrorCode::ERROR_RABBIT_MQ, '消息未失败或不存在,无法重发')->withException();
            }

            if ($mqProcessInfo['process_num'] >=  self::RESEND_TIME_LIMIT) {
                resultData([], ErrorCode::ERROR_RABBIT_MQ, '消息重发次数到达上限--' . self::RESEND_TIME_LIMIT)->withException();
            }

            //更新执行情况为未执行
            MqProcessLog::getInstance()->simpleUpdate(['id' => $data[self::MSG_ID_KEY]], ['process_status' => 0]);
        } else {
            $curTime = time();
            $msgFindKeyword = '';
            //获取查询关键词
            if (isset($data[self::MSG_FIND_KEYWORD_STR])) {
                $msgFindKeyword = $data[self::MSG_FIND_KEYWORD_STR];
                unset($data[self::MSG_FIND_KEYWORD_STR]);
            }

            $msgStr = mixedTwoWayOpt($data, 2, true);

            //消息信息存入数据库记录执行状态
            $insertData = [
                'msg_str' => $msgStr,
                'msg_type' => $this->routeKey2MsgType($routeKey),
                'create_time' => $curTime,
                'find_keyword' => $msgFindKeyword,
            ];

            //入库并且发送写入msg_id  发送消息给消费者
            $mqProcessInfo = MqProcessLog::getInstance()->simpleCreate($insertData);
        }
        if (is_null($mqProcessInfo)) {
            $res =  resultData([], ErrorCode::ERROR_RABBIT_MQ, '消息ID生成失败');
        } else {
            try {
                //把插入后的消息ID放入消息body中
                $msg = new AMQPMessage(mixedTwoWayOpt([self::MSG_ID_KEY => $mqProcessInfo['id']], 2, true), ['delivery_mode' => 2,]);
                //设置交换机
                $this->channel->exchange_declare($exchangeName, $exchangeType, false, true);

                //设置队列
                $this->channel->queue_declare($queueName, false, true, false, false);
                $this->channel->queue_bind($queueName, $exchangeName, $queueRouteKey);

                $this->channel->basic_publish($msg, $exchangeName, $routeKey);

            }catch (\Throwable $e) {
                $res = resultData([], ErrorCode::ERROR_RABBIT_MQ, '消息发送失败');
            }
        }

        return $res;
    }

    /**
     * 根据routeKey获取对应的队列配置
     * @param string $routeKey 路由key
     * @return array|mixed
     * @author zxqc2018
     */
    protected function getQueuePriorityInfo($routeKey)
    {
        $routeKeyPartArr = myExplode($routeKey, '.');
        return $this->queuePriorityConfig[$routeKeyPartArr[1] ?? ''] ?? [];
    }

    /**
     * 发送优先级定义消息
     * @param array $data 消息数据
     * @param string $routeKey 路由key
     * @return mixed
     * @author zxqc2018
     */
    function sendPriorityMsg($data, $routeKey)
    {
        $exchangeParam= [$this->defaultExchangeName, 'topic'];
        $queueParam = $this->getQueuePriorityInfo($routeKey);
        return $this->sendMsg($data, $exchangeParam, $queueParam, $routeKey);
    }

    /**
     * 重新发送优先级定义消息
     * @param string|int $msgType 消息类型|路由key
     * @param int $msgId 消息ID
     * @return mixed
     * @author zxqc2018
     */
    function resendPriorityMsg($msgType, $msgId)
    {
        if (!is_numeric($msgType)) {
            $msgType = $this->routeKey2MsgType($msgType);
        }
        $resendWhere = [
          'msg_type' => $msgType,
          'process_status' => 2,
        ];

        //指定消息ID
        if (!empty($msgId)) {
            $resendWhere['id'] = $msgId;
        }

        $mqProcessInfo = MqProcessLog::getInstance()->findOne($resendWhere);

        //重发消息
        if (!empty($mqProcessInfo)) {
            $this->sendPriorityMsg([self::MSG_ID_KEY => $msgId], $this->routeKey2MsgType($mqProcessInfo['msg_type']));
        }
    }

    /**
     * 处理消息方法
     * @param array msgData mq_process_log 行数据
     * @return mixed
     * @author zxqc2018
     */
    function msgProcess(array $msgData)
    {
        $routKey = $this->routeKey2MsgType($msgData['msg_type']);
        //调用各自的routeKey process setting
        $this->settingRouteKeyProcessFunc();

        if (isset($this->routeKeyProcessFunc[$routKey])) {
            $res = call_user_func_array($this->routeKeyProcessFunc[$routKey], [$msgData]);
        } else {
            $res = resultData([], ErrorCode::ERROR_RABBIT_MQ, 'routeKey处理方法不存在');
        }
        return $res;
    }
}
